package com.atguigu.flink.sql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * Created by Smexy on 2023/3/4
 */
public class Demo9_DefineTime
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);

        /*
                读文件，将文件中的时间引申为事件时间，获取一个处理时间，定义水印

                原始数据中的时间是: 2020-04-15 20:13:40.564 格式，就用  TIMESTAMP(3)
                                 2020-04-15 20:13:40 格式，就用  TIMESTAMP(0)

                                 1618989564564 ,就用 TIMESTAMP_LTZ(3)
                                 1618989564 ,就用 TIMESTAMP_LTZ(0)
         */
        String createTableSql = " create table t1 ( id string, ts bigint  , vc int , " +
            "                   et  as TO_TIMESTAMP_LTZ(ts,3)    ,   " +
            "                   pt as  PROCTIME() ," +
            "                   WATERMARK FOR et AS et - INTERVAL '1' SECOND   )" +
            " with ( " +
            " 'connector' = 'filesystem' ,   " +
            " 'path' =  'data/cep.txt' ,   " +
            "  'format' = 'csv' " +
            "      )                 ";

        tableEnvironment.executeSql(createTableSql);
        tableEnvironment.sqlQuery("select * from t1")
                        .execute()
                        .print();

    }
}
